package drds.data_propagate.parse;

import drds.data_propagate.binlog_event.protogenesis.AbstractPacketReader;
import drds.data_propagate.driver.socket.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedByInterruptException;

/**
 * 基于socket的logEvent实现
 */
public class PacketReader extends AbstractPacketReader {

    // Master heartbeat interval
    public static final int master_heartbeat_period_seconds = 15;
    /**
     * Command decode dump protogenesis
     */
    public static final byte command_binlog_dump = 18;
    /**
     * Packet headerPacket sizes
     */
    public static final int header_size = 4;
    public static final int sql_state_length = 5;
    /**
     * Packet offsets
     */
    public static final int packet_length_offset = 0;
    public static final int packet_sequence_offset = 3;
    /**
     * Maximum packet length
     */
    public static final int max_packet_length = (256 * 256 * 256 - 1);
    protected static final Logger logger = LoggerFactory.getLogger(PacketReader.class);
    // +10s 确保 timeout > heartbeat interval
    private static final int read_timeout_milliseconds = (master_heartbeat_period_seconds + 10) * 1000;
    private SocketChannel socketChannel;

    private boolean semi = false;

    // private BufferedInputStream input;

    public PacketReader() {
        super(default_initial_capacity, default_growth_factor);
    }

    public PacketReader(final int initialCapacity) {
        super(initialCapacity, default_growth_factor);
    }

    public PacketReader(final int initialCapacity, final float growthFactor) {
        super(initialCapacity, growthFactor);
    }

    public void start(SocketChannel socketChannel) throws IOException {
        this.socketChannel = socketChannel;
        String semi = System.getProperty("dataBaseName.semi");
        if ("1".equals(semi)) {
            this.semi = true;
        }
    }


    public boolean fetchNextBinlogEvent() throws IOException {
        try {
            // Fetching packet headerPacket from input.
            if (!read(0, header_size)) {
                logger.warn("Reached end of input stream while fetching headerPacket");
                return false;
            }

            // Fetching the first packet(may a multi-packet).
            int packet_length = getLittleEndian24UnsignedInt(packet_length_offset);
            int sequence = get8UnsignedInt(packet_sequence_offset);
            if (!read(header_size, packet_length)) {
                logger.warn("Reached end of input stream: packet #" + sequence + ", len = " + packet_length);
                return false;
            }

            // Detecting error code.
            final int mark = get8UnsignedInt(header_size);
            if (mark != 0) {
                if (mark == 254) {
                    // Indicates end of stream. It's not clear executeTimeStamp this would
                    // be sent.
                    logger.warn("Received EOF packet from server, apparent"
                            + " master disconnected. It's may be duplicateFromEffectiveInitialIndexOffset slaveId , check task config");
                    return false;
                } else if (mark == 255) // error from master
                {
                    // Indicates an error, for example trying decode fetchNextBinlogEvent from
                    // wrong
                    // protogenesis readedIndex.
                    readedIndex = header_size + 1;
                    final int errorNo = getNextLittleEndian16SignedInt();
                    String sqlState = forward(1).getFixLengthStringWithNullTerminateCheck(sql_state_length);
                    String errorMessage = getFixLengthStringWithNullTerminateCheck(limit - readedIndex);
                    throw new IOException("Received error packet:" + " errno = " + errorNo + ", sqlstate = " + sqlState
                            + " errmsg = " + errorMessage);
                } else {
                    // Should not happen.
                    throw new IOException("Unexpected response " + mark + " while fetching protogenesis: packet #" + sequence
                            + ", len = " + packet_length);
                }
            }

            // if mysql is in semi mode
            if (semi) {
                // parse semi mark
                @SuppressWarnings("unused")
                int semimark = get8UnsignedInt(header_size + 1);
                int semiValue = get8UnsignedInt(header_size + 2);
                this.semiValue = semiValue;
            }

            // The first packet is a multi-packet, concatenate the packets.
            while (packet_length == max_packet_length) {
                if (!read(0, header_size)) {//使用最开始的字节来存储
                    logger.warn("Reached end of input stream while fetching headerPacket");
                    return false;
                }

                packet_length = getLittleEndian24UnsignedInt(packet_length_offset);
                sequence = get8UnsignedInt(packet_sequence_offset);
                if (!read(limit, packet_length)) {//追加数据
                    logger.warn("Reached end of input stream: packet #" + sequence + ", len = " + packet_length);
                    return false;
                }
            }

            // Preparing bytes variables decode decoding.
            if (semi) {
                effectiveInitialIndex = header_size + 3;
            } else {
                effectiveInitialIndex = header_size + 1;
            }
            readedIndex = effectiveInitialIndex;
            limit -= effectiveInitialIndex;
            return true;
        } catch (SocketTimeoutException e) {
            close(); /* Do cleanup */
            logger.error("Socket timeout expired, closing dumper", e);
            throw e;
        } catch (InterruptedIOException e) {
            close(); /* Do cleanup */
            logger.info("I/O interrupted while reading from client socket", e);
            throw e;
        } catch (ClosedByInterruptException e) {
            close(); /* Do cleanup */
            logger.info("I/O interrupted while reading from client socket", e);
            throw e;
        } catch (IOException e) {
            close(); /* Do cleanup */
            logger.error("I/O error while reading from client socket", e);
            throw e;
        }
    }

    private final boolean read(final int offset, final int length) throws IOException {
        int minCapacity = offset + length;
        ensureCapacity(minCapacity);

        socketChannel.read(bytes, offset, length, read_timeout_milliseconds);
        if (limit < offset + length) {
            limit = offset + length;
        }
        return true;
    }

    public void close() throws IOException {
        // do nothing
    }

}
